Pinvon's Blog

所见, 所闻, 所思, 所想

多线程

Python 中的线程

Python 代码的执行是由 Python 虚拟机(解释器主循环)控制的, Python 解释器中可以运行多个线程, 但在任意给定时刻, 只有一个线程会被解释器执行. Python 使用全局解释器锁(GIL)来保证这一点. 执行步骤如下:

  1. 设置 GIL;
  2. 切换到一个线程, 执行;
  3. 执行指令或主动让出控制权(time.sleep(0));
  4. 切换到另一个线程;
  5. 解锁 GIL;
  6. 重复上述步骤.

thread 模块

thread 模块的常用函数如下:

start_new_thread(func, args, kwargs=None) 派生一个新线程, 使用给定的 args 和可选的 kwargs 来执行 func
allocate_lock() 分配 LockType 锁对象
exit() 退出线程

LockType 是 thread 模块提供的同步数据结构, 也叫原语锁, 简单锁, 互斥锁等, 常用方法如下:

acquire(wait=None) 深度获取锁对象
locked() 获取锁对象返回 True, 未获取返回 False
release() 释放锁

例子

#!/usr/bin/env python
import thread
from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec,lock):
    print 'start loop', nloop, 'at:',ctime()
    sleep(nsec)
    print 'loop',nloop,'done at:',ctime()
    lock.release()

def main():
    print'starting at:',ctime()
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        thread.start_new_thread(loop,(i,loops[i],locks[i]))

    for i in nloops:
        while locks[i].locked():pass

    print 'all done at:', ctime()

if __name__ == '__main__':
    main()

main() 中使用了 3 个独立的 for 循环.

第 1 个 for 循环:

  • 使用 thread.allocate_lock() 分配锁, 得到锁对象;
  • 通过 acquire() 取得锁(相当于上锁);
  • 将锁添加到锁列表 locks 中.

第 2 个 for 循环: 派生线程, 每个线程调用 loop(), 传递循环号, 睡眠时间, 用于该线程的锁. 不在上锁的时候就启动线程的原因:

  • 想让所有的线程几乎同时开始执行;
  • 获取锁需要时间, 有可能线程在获取锁之前就结束了.

第 3 个 for 循环: 不断测试是否获取了锁对象, 当线程尚未释放锁时返回 True, 会继续测试, 当线程释放后返回 False, 结束测试; 一般会优先检查第一个锁, 只有第一个锁释放之后, 才会继续检查剩下的锁; 当所有锁都被释放时, 主线程才结束.

threading 模块

threading 模块中的可用对象如下表:

Thread 表示一个执行线程的对象
Lock 锁原语对象
RLock 可重入锁, 使单一线程可再次获得已持有的锁
Condition 条件变量对象, 使得一个线程等待另一个线程满足特定的"条件", 如改变状态或某个数据值
Event 条件变量的通用版本, 任意数量的线程等待某个事件的发生, 在该事件发生后所有线程将被激活
Semaphore 为线程间共享的有限资源提供了一个计数器, 如果没有可用资源时会被阻塞
BoundedSemaphore 与 Semaphore 相似, 不过它不允许超过初始值
Timer 与 Thread 相似, 不过它要在运行前等待一段时间
Barrier 创建一个"障碍", 必须达到指定数量的线程后才可以继续

Thread 类

Thread.daemon 属性, 用来判断该线程是否是守护线程, 只有设置了守护线程, 主线程才会等待其他线程结束.

例子

#!/usr/bin/env python
import threading
from time import sleep,ctime

loops = [4,2]

def loop(nloop,nsec,lock):
    print 'start loop', nloop, 'at:',ctime()
    sleep(nsec)
    print 'loop',nloop,'done at:',ctime()
	# lock.release()  # 不再需要

def main():
    print'starting at:',ctime()
    threads = []  # 替换 locks = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))  # 替换 thread.allocate_lock()
        threads.append(t)

    for i in nloops:
		threads[i].start()  # 替换 thread.start_new_thread(loop,(i,loops[i],locks[i]))

    for i in nloops:
		threads[i].join()  # 替换 while locks[i].locked():pass

    print 'all done at:', ctime()

if __name__ == '__main__':
    main()

最重要的修改在于, Thread 类使用 start() 开始线程, 使用 join() 等待线程结束.

更加接近面向对象的多线程编程

import threading
from time import sleep,ctime
 
loops = [4,2]
 
class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self):
        self.func(*self.args)
 
def loop(nloop,nsec):
    print('start loop',nloop,'at:',ctime())
    sleep(nsec)
    print('loop',nloop,'done at:',ctime())
 
def main():
    print('starting at:',ctime())
    threads = []
    nloops = list(range(len(loops)))
 
    for i in nloops:
        t = threading.Thread(
            target=ThreadFunc(loop,(i,loops[i]),
                loop.__name__))
        threads.append(t)
 
    for i in nloops: #开始多线程
        threads[i].start()
 
    for i in nloops: #等待所有线程完成
        threads[i].join()
 
    print('all done at:',ctime())
 
if __name__ == '__main__':
	main()

主要是添加了 ThreadFunc 类, 在实例化 Thread 对象时进行了改动, 同时实例化了可调用类 ThreadFunc. 创建新线程时, Thread 类的代码将调用 ThreadFunc 对象, 此时会调用 __call__() 这个方法.

这种方法更加通用, 不局限于 loop() 函数.

最终方案: 使用 Thread 的派生类

import threading
from time import sleep,ctime
 
loops = (4,2)
 
class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
 
    def run(self):
        self.func(*self.args)
 
def loop(nloop,nsec):
    print('start loop',nloop,'at:',ctime())
    sleep(nsec)
    print('loop',nloop,'done at:',ctime())
 
def main():
    print('starting at:',ctime())
    threads = []
    nloops = list(range(len(loops)))
 
    for i in nloops:
        t = MyThread(loop,(i,loops[i]),loop.__name__)
        threads.append(t)
 
    for i in nloops: #开始多线程
        threads[i].start()
 
    for i in nloops: #等待所有线程完成
        threads[i].join()
 
    print('all done at:',ctime())
 
if __name__ == '__main__':
    main()
  • 子类的构造函数必须先调用基类的构造函数;

主要变化:

  • __call__() 在子类中必须要改成 run().

小结

推荐使用 threading 模块, 而不是 thread 模块.

threading 模块更加先进, 有更好的线程支持, thread 模块的同步原语(锁)很少, 而 threading 模块则很多.

thread 模块对于进程何时退出没有控制, 当主线程结束时, 其他所有线程也都强制结束, 不会发出警告或者进行适当的清理, 而 threading 模块能确保重要的子线程在进程退出前结束.

在 Python3 中, thread 模块被重命名为 _thread 模块.

Comments

使用 Disqus 评论
comments powered by Disqus